/*
 * UdtMediaCasterConnection.cpp
 *
 *  Created on: 2016年6月23日
 *      Author: terry
 */

#include "UdtMediaCasterConnection.h"
#include "CLog.h"
#include "TStringUtil.h"
#include "Socket.h"
#include "MediaCasterTable.h"
#include <assert.h>
#include <StopWatch.h>
#include "MediaTransportHelper.h"
#include "UdtMediaCasterServer.h"
#include "MediaCasterConfig.h"
#include "StopWatch.h"


namespace av
{

UdtMediaCasterConnection::UdtMediaCasterConnection():
		m_socket(UDT::INVALID_SOCK),
        m_peerAddr(),
		m_peerID(),
		m_header(),
		m_headerReady(),
		m_format(),
		m_server()
{
    m_buffer.ensure(1024 * 500);
}

UdtMediaCasterConnection::~UdtMediaCasterConnection()
{
	close();
}

bool UdtMediaCasterConnection::equals(const std::string& name) const
{
	return (m_streamName == name);
}

bool UdtMediaCasterConnection::open(UdtMediaCasterServer* server, UDTSOCKET fd)
{
	if (fd == UDT::INVALID_SOCK)
	{
		return false;
	}

	m_socket = fd;
	m_server = server;

    int mtu = 1052;
    UDT::setsockopt(m_socket, 0, UDT_MSS, &mtu, sizeof(int));

    //int size = 1024 * 500;
    //UDT::setsockopt(m_socket, 0, UDT_SNDBUF, &size, sizeof(int));

    //size = 1024 * 128;
    //UDT::setsockopt(m_socket, 0, UDP_SNDBUF, &size, sizeof(int));
    

	int len = sizeof(m_peerAddr);
	UDT::getpeername(m_socket, (sockaddr*)&m_peerAddr, &len);
	m_peerID = comn::StringUtil::format("%s:%d", inet_ntoa(m_peerAddr.sin_addr), ntohs(m_peerAddr.sin_port));

	m_fds.push_back(fd);

	return start();
}

void UdtMediaCasterConnection::close()
{
	if (isRunning())
	{
		stop();
	}

	if (m_socket != UDT::INVALID_SOCK)
	{
		UDT::close(m_socket);
		m_socket = UDT::INVALID_SOCK;
	}
}

void UdtMediaCasterConnection::onMediaFormat(const MediaFormat& fmt)
{
	m_format = fmt;
}

void UdtMediaCasterConnection::onMediaPacket(MediaPacketPtr& pkt)
{
	m_pktQueue.push(pkt);
}

void UdtMediaCasterConnection::onMediaEvent(int event)
{
	// pass
}

int UdtMediaCasterConnection::run()
{
	if (!checkRead(1000*5) || !handleCommand())
	{
		handleClose();
		return 0;
	}

	while (!m_canExit)
	{
		MediaPacketPtr pkt;
		m_pktQueue.pop(pkt, -1);

		if (!pkt)
		{
			continue;
		}

        int64_t duration = m_pktQueue.getDuration();
        if (duration > MediaCasterConfig::s_queueDuration)
        {
            size_t count = m_pktQueue.dropUntilKeyFrame();
            CLog::warning("que duration is too much. duration:%d, drop:%d\n", (int)duration, count);
        }

        comn::StopWatch watch;
        if (!sendPacket(pkt))
        {
            break;
        }
        
        if (watch.elapse() > 15)
        {
            CLog::debug("UdtMediaCasterConnection sendPacket elapse:%d\n", watch.elapse());
        }
	}

    handleClose();

	return 0;
}

void UdtMediaCasterConnection::doStop()
{
	m_pktQueue.cancelWait();

	if (m_socket != UDT::INVALID_SOCK)
	{
		UDT::close(m_socket);
		m_socket = UDT::INVALID_SOCK;
	}
}

bool UdtMediaCasterConnection::checkRead(long ms)
{
	std::vector<UDTSOCKET> fds;
	fds.push_back(m_socket);
	//fds.push_back(m_socketPair.getHandle());

	std::vector<UDTSOCKET> rfds;
	std::vector<UDTSOCKET> xfds;
	UDT::selectEx(fds, &rfds, NULL, &xfds, ms);
	return rfds.size() > 0;
}

bool UdtMediaCasterConnection::handleCommand()
{
	int length = UDT::recv(m_socket, (char*)&m_header, sizeof(m_header), 0);
	if (length != sizeof(m_header))
	{
		return false;
	}

	m_buffer.ensure(m_header.length);

	length = UDT::recv(m_socket, (char*)m_buffer.data(), m_header.length, 0);
	if (length != m_header.length)
	{
		return false;
	}

    std::string json(m_buffer.c_str(), m_header.length);
    Json::Value req;
    Json::Reader reader;
    if (!reader.parse(json, req))
    {
        return false;
    }

    std::string name = req["name"].asString();

	CMediaCasterPtr caster;
	MediaCasterTable::instance().find(name, caster);
	//if (!caster)
	{
		UdtMediaCasterConnectionPtr conn = shared_from_this();
		m_server->onAcquireStream(name, conn);

		MediaCasterTable::instance().find(name, caster);
	}

	bool done = false;
	Json::Value resp;

	if (caster)
	{
		done = true;

		m_streamName = name;

		av::MediaSinkPtr sink = shared_from_this();
		caster->addSink(sink);

		av::MediaFormat fmt;
		caster->getFormat(fmt);

		MediaTransportHelper::toJson(fmt, resp);
	}
	else
	{
		MediaTransportHelper::makeError(resp, ENOENT, "no such stream");
	}

	std::string text = comn::StringCast::toString(resp);

	MediaTransportHeader header;
	if (done)
	{
		MediaTransport::setCommand(header, av::kMediaCmdResp);
	}
	else
	{
		MediaTransport::setCommand(header, av::kMediaCmdError);
	}

	header.length = text.size();

    m_buffer.clear();
    m_buffer.write((char*)&header, sizeof(header));
    m_buffer.write(text.c_str(), text.size());

    int ret = UDT::send(m_socket, (char*)m_buffer.data(), m_buffer.length(), 0);

	return done;
}

void UdtMediaCasterConnection::handleClose()
{
	UdtMediaCasterConnectionPtr conn = shared_from_this();
	m_server->remove(conn);
}

UDTSOCKET UdtMediaCasterConnection::getHandle()
{
	return m_socket;
}

const std::string& UdtMediaCasterConnection::getStreamName()
{
	return m_streamName;
}

const sockaddr_in& UdtMediaCasterConnection::getPeerName()
{
    return m_peerAddr;
}

bool UdtMediaCasterConnection::sendPacket(MediaPacketPtr& pkt)
{
	MediaTransportHeader header;
	memset(&header, 0, sizeof(header));
	header.magic = MediaTransport::MAGIC;
	header.tag = kMediaData;
    header.subtype = pkt->type;
    header.length = pkt->size;
    header.timestamp = pkt->pts;
    header.duration = pkt->duration;
    header.flags = pkt->flags;

    m_buffer.clear();
    m_buffer.write((char*)&header, sizeof(header));
    m_buffer.write(pkt->data, pkt->size);

    char* data = (char*)m_buffer.data();
    int length = m_buffer.length();
    while (length > 0)
    {
        int ret = UDT::send(m_socket, data, length, 0);
        if (ret < 0)
        {
            break;
        }
        else
        {
            data += ret;
            length -= ret;
        }
    }

    return length == 0;
}


} /* namespace av */
